package org.zjvis.datascience.spark

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.scalactic._
import org.zjvis.datascience.spark.algorithm.{AlgorithmEntryWrapper, BaseAlgorithm, DBscanAlgorithmV2, FPGrowthAlgorithm, IsolationForestAlgorithm, KmeansAlgorithm, LinearRegressionAlgorithm, LogisticRegressionAlgorithm, PcaAlgorithm, PrefixSpanAlgorithm, StatisticsAnomaly}
import spark.jobserver.SparkSessionJob
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem}

import scala.util.Try

/**
 * @author wangyizhong
 * @date 2020-12-02
 * @desc spark-jobserver 调度适配
 */
object AlgorithmAdaptor extends SparkSessionJob {

  type JobData = Seq[String]
  type JobOutput = String

  override def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = {
    sparkSession.sparkContext.getConf
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(Array(classOf[KmeansAlgorithm], classOf[BaseAlgorithm],
      classOf[IsolationForestAlgorithm], classOf[StatisticsAnomaly], classOf[PrefixSpanAlgorithm],
      classOf[LinearRegressionAlgorithm], classOf[LogisticRegressionAlgorithm], classOf[PcaAlgorithm],
      classOf[FPGrowthAlgorithm], classOf[DBscanAlgorithmV2]))

    val result = AlgorithmEntryWrapper.entry(data.toArray, sparkSession)
    result
  }

  override def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
  JobData Or Every[ValidationProblem] = {
    Try(config.getString("input.string").split(" ").toSeq)
      .map(words => Good(words))
      .getOrElse(Bad(One(SingleProblem("No input.string param"))))
  }
}
